package com.cloudansys.core.flink.sink;

import com.alibaba.fastjson.JSON;
import com.cloudansys.config.DefaultConfig;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.LiftDataEntity;
import com.cloudansys.core.entity.MultiDataEntity;
import com.cloudansys.core.util.CustomDateUtil;
import com.cloudansys.core.util.MqttUtil;
import com.cloudansys.core.util.SerializeUtil;
import com.cloudansys.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.eclipse.paho.client.mqttv3.MqttClient;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

@Slf4j
public class RawMqttSink extends RichSinkFunction<List<MultiDataEntity>> {

    private static String topic_data;
    private static String topic_th_max;
    private static String topic_th_min;
    private static String projectId;
    private static MqttClient mqttClient;
    private static Jedis jedis;
    private Long lastTime = 0L;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        jedis = DefaultConfig.getJedis();
        topic_data = DefaultConfig.get(Const.MQTT_TOPIC_DATA_ZD);
        topic_th_max = DefaultConfig.get(Const.MQTT_TOPIC_TH_MAX);
        topic_th_min = DefaultConfig.get(Const.MQTT_TOPIC_TH_MIN);
        mqttClient = DefaultConfig.getMqttClient();
        projectId = jedis.get(Const.PID);
    }

    /**
     * 把振动传感器原始监测数据推送到 mqtt
     */
    @Override
    public void invoke(List<MultiDataEntity> element, Context context) {
        // 推送阈值 打印阈值信息
        printZDTh(element);

        // 实时推送
        for (MultiDataEntity multiDataEntity : element) {
            // 推送数据
            sendToMqtt(multiDataEntity);
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (jedis != null) {
            jedis.close();
        }
        if (mqttClient != null) {
            mqttClient.disconnect();
            mqttClient.close();
        }
    }

    /**
     * 阈值推送
     */
    private void sendToMqtt(String serialCode) {
        // 获取振动阈值的最大值和最小值
        String maxTh = jedis.get(Const.ZD_TH_MAX + serialCode);
        String minTh = jedis.get(Const.ZD_TH_MIN + serialCode);
        String maxThTopic = StrUtil.stringsToStr(Const.SLASH, topic_th_max, projectId, serialCode);
        String minThTopic = StrUtil.stringsToStr(Const.SLASH, topic_th_min, projectId, serialCode);
        try {
            // 向 mqtt 推送数据
            MqttUtil.sendToMqtt(mqttClient, maxThTopic, maxTh);
            MqttUtil.sendToMqtt(mqttClient, minThTopic, minTh);
//            log.info("##### send to mqtt success #####");
        } catch (Exception e) {
            log.info("##### send to mqtt failed 【th】#####");
            e.printStackTrace();
        }
    }

    /**
     * 数据推送
     */
    private void sendToMqtt(MultiDataEntity multiDataEntity) {
        LiftDataEntity targetInfo = getTargetInfo(multiDataEntity);
        if (targetInfo == null) {
            return;
        }
        // topic
        String projectId = String.valueOf(targetInfo.getProjectId());
        String serialCode = targetInfo.getSerialCode();
        String topic = StrUtil.stringsToStr(Const.SLASH, topic_data, projectId, serialCode);
        // 数据
        String jsonString = JSON.toJSONString(targetInfo);
        try {
            // 向 mqtt 推送数据
            MqttUtil.sendToMqtt(mqttClient, topic, jsonString);
//            log.info("##### send to mqtt success #####");
        } catch (Exception e) {
            log.info("##### send to mqtt failed【data】#####");
            e.printStackTrace();
        }
    }

    /**
     * 获取 redis 缓存中对应的传感器信息
     * 【key -> hk:bi:tgt:projectId_serialCode】，
     */
    private LiftDataEntity getTargetInfo(MultiDataEntity multiDataEntity) {
        String projectId = multiDataEntity.getProjectId();
        String serialCode = multiDataEntity.getSerialCode();
        String pickTime = multiDataEntity.getPickTime();
        Double[] values = multiDataEntity.getValues();
        List<Double> valueList = Arrays.asList(values);
        if (valueList.contains(null)) {
            int index = valueList.indexOf(null);
            values[index] = 0.0;
        }
        // 获取 redis 缓存中对应的传感器信息
        String tgtKey = "hk:bi:tgt:" + projectId + Const.BAR_BOTTOM + serialCode;
        String s = jedis.get(tgtKey);
        if (s == null) {
            log.error("获取传感器基础信息失败！redisKey 为：{}", tgtKey);
            return null;
        }
        LiftDataEntity liftDataEntity = (LiftDataEntity) SerializeUtil.deserialize(s.getBytes());
        // 更新数据和数据时间
        liftDataEntity.setQuotaValues(values);
        liftDataEntity.setPickTime(CustomDateUtil.formatToStdTime(pickTime));
        return liftDataEntity;
    }

    /**
     * 打印振动阈值
     */
    private void printZDTh(List<MultiDataEntity> element) {
        long thisTime = System.currentTimeMillis();
        long diffTime = thisTime - lastTime;
        if (diffTime > 1000) {
            for (MultiDataEntity multiDataEntity : element) {
                String sc = multiDataEntity.getSerialCode();
                sendToMqtt(sc);
                List<Double> th = getZDThreshold(sc);
                log.info("====【sc:{}】【max：{}】【min：{}】====", sc, th.get(0), th.get(1));
            }
            lastTime = System.currentTimeMillis();
        }
    }

    /**
     * 获取设置的振动阈值
     */
    private List<Double> getZDThreshold(String sc) {
        List<Double> thList = new ArrayList<>();
        double maxTh = Double.parseDouble(jedis.get(Const.ZD_TH_MAX + sc));
        double minTh = Double.parseDouble(jedis.get(Const.ZD_TH_MIN + sc));
        thList.add(maxTh);
        thList.add(minTh);
        return thList;
    }

}
